/*
 * (C) Copyright 2019-2020, by Semen Chudakov and Contributors.
 *
 * JGraphT : a free Java graph-theory library
 *
 * See the CONTRIBUTORS.md file distributed with this work for additional
 * information regarding copyright ownership.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the
 * GNU Lesser General Public License v2.1 or later
 * which is available at
 * http://www.gnu.org/licenses/old-licenses/lgpl-2.1-standalone.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR LGPL-2.1-or-later
 */
package org.jgrapht.alg.shortestpath;

import org.jgrapht.Graph;
import org.jgrapht.Graphs;
import org.jgrapht.alg.util.Pair;
import org.jgrapht.graph.MaskSubgraph;
import org.jgrapht.graph.builder.GraphTypeBuilder;
import org.jgrapht.util.ConcurrencyUtil;
import org.jheaps.AddressableHeap;
import org.jheaps.tree.PairingHeap;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
 * Parallel implementation of the <a href="https://en.wikipedia.org/wiki/Contraction_hierarchies">
 * contraction hierarchy route planning precomputation technique</a>.
 *
 * <p>
 * The original algorithm is described the article: Robert Geisberger, Peter Sanders, Dominik
 * Schultes, and Daniel Delling. 2008. Contraction hierarchies: faster and simpler hierarchical
 * routing in road networks. In Proceedings of the 7th international conference on Experimental
 * algorithms (WEA'08), Catherine C. McGeoch (Ed.). Springer-Verlag, Berlin, Heidelberg, 319-333.
 *
 * <p>
 * Parallel version of the algorithm is described in the article: Vetter, Christian. "Parallel
 * Time-Dependent Contraction Hierarchies." (2009).
 *
 * <p>
 * This algorithm speeds up shortest paths computation by contracting graph vertices. To contract a
 * vertex means to remove it from the graph in such a way that shortest paths in the remaining
 * overlay graph are preserved. This property is achieved by replacing paths of the form $\langle u,
 * v, w\rangle$ by a shortcut edge $(u, w)$. Note that the shortcut $(u, w)$ is only required if
 * $\langle u, v, w\rangle$ is the only shortest path from $u$ to $w$.
 *
 * <p>
 * Contraction is performed as follows. First a priority is computed for each vertex in the graph.
 * This implementation uses edge quotient, complexity quotient and hierarchical depth metrics for
 * computing priority. A hierarchy is then generated by iteratively contracting independent sets of
 * vertices. A vertex is independent iff it`s priority is less than the priority of every vertex in
 * its 2-neighbourhood. A 2-neighbourhood of a vertex $v$ is defined as a set of vertices that are
 * reachable from $v$ using at most 2 hops. After contraction each vertex gets its unique
 * contraction level - its position in the computed hierarchy. Finally, after all vertices are
 * contracted each edge is set to be either upward if its source has lower level that its target, or
 * downward if vice versa.
 *
 * <p>
 * Computing initial priorities, independent sets and shortcuts, updating neighbours priorities and
 * marking upward edges is performed in parallel what gives this implementation performance speedup
 * comparing to the sequential approach.
 *
 * <p>
 * For parallelization, this implementation relies on the {@link ThreadPoolExecutor}
 * which is supplied to this algorithm from outside.
 *
 * @param <V> the graph vertex type
 * @param <E> the graph edge type
 * @author Semen Chudakov
 */
public class ContractionHierarchyPrecomputation<V, E>
{
    /**
     * The underlying graph.
     */
    private Graph<V, E> graph;

    /**
     * Graph that stores the computed contraction hierarchy.
     */
    private Graph<ContractionVertex<V>, ContractionEdge<E>> contractionGraph;
    /**
     * Mapping of the vertices in the original graph to the vertices in the contraction hierarchy
     * graph.
     */
    private Map<V, ContractionVertex<V>> contractionMapping;
    /**
     * The immutable view of the {@code contractionGraph} which masks already contracted vertices.
     * It is used to determine overlay graph during the computations.
     */
    private Graph<ContractionVertex<V>, ContractionEdge<E>> maskedContractionGraph;

    /**
     * Vertices of the {@code contractionGraph}.
     */
    private List<ContractionVertex<V>> vertices;
    /**
     * Lists of shortcuts that correspond to vertices in the {@code contractionGraph}. The id of a
     * vertex is the position in this array, where corresponding shortcuts are stored.
     */
    private List<List<Pair<ContractionEdge<E>, ContractionEdge<E>>>> shortcutEdges;
    /**
     * Data of each vertex int the {@code contractionGraph}. The id of a vertex is the position in
     * this list, where corresponding data is stored.
     */
    private List<VertexData> verticesData;

    /**
     * Counter of contraction level that should be assigned to vertex that is being contracted.
     * Variable is made atomic due to the concurrent updates in the computations.
     */
    private AtomicInteger contractionLevelCounter;

    /**
     * Supplier for the preferable heap implementation.
     */
    private Supplier<AddressableHeap<Double, ContractionVertex<V>>> shortcutsSearchHeapSupplier;

    /**
     * Decorator for {@link ThreadPoolExecutor} supplied to this algorithm that enables to
     * keep track of when all submitted tasks are finished.
     */
    private ExecutorCompletionService<Void> completionService;
    /**
     * Maximum number of threads used in the computations.
     */
    private int parallelism;

    /**
     * Tasks that are submitted to the {@code executor}.
     */
    private List<ContractionTask> tasks;

    /**
     * Consumers that perform computation of initial priorities for vertices in
     * {@code contractionGraph}. Each consumer holds an instance of the {@link Random} class to
     * avoid concurrent calls to single instance.
     */
    private List<Consumer<ContractionVertex<V>>> computeInitialPrioritiesConsumers;
    /**
     * Computes independent set during contraction.
     */
    private Consumer<ContractionVertex<V>> computeIndependentSetConsumer;
    /**
     * Computes shortcuts for a vertex.
     */
    private Consumer<ContractionVertex<V>> computeShortcutsConsumer;
    /**
     * Updates neighbours priorities of a vertex.
     */
    private Consumer<ContractionVertex<V>> updateNeighboursConsumer;
    /**
     * Sets value of {@code isUpward} for the outgoing edges of a vertex.
     */
    private Consumer<ContractionVertex<V>> markUpwardEdgesConsumer;

    /**
     * Constructs a new instance of the algorithm for a given {@code graph}.
     *
     * @param graph graph
     * @deprecated replaced with {@link #ContractionHierarchyPrecomputation(Graph, ThreadPoolExecutor)}
     */
    @Deprecated
    public ContractionHierarchyPrecomputation(Graph<V, E> graph)
    {
        this(graph, Runtime.getRuntime().availableProcessors());
    }

    /**
     * Constructs a new instance of the algorithm for a given {@code graph} and {@code executor}.
     * It is up to a user of this algorithm to handle the creation and termination of the
     * provided {@code executor}. For utility methods to manage a {@code ThreadPoolExecutor} see
     * {@link ConcurrencyUtil}.
     *
     * @param graph graph
     * @param executor executor which will be used for parallelization
     */
    public ContractionHierarchyPrecomputation(Graph<V, E> graph, ThreadPoolExecutor executor)
    {
        this(graph,  Random::new, executor);
    }

    /**
     * Constructs a new instance of the algorithm for a given {@code graph} and {@code parallelism}.
     *
     * @param graph graph
     * @param parallelism maximum number of threads used in the computations
     * @deprecated replaced with {@link #ContractionHierarchyPrecomputation(Graph, ThreadPoolExecutor)}
     */
    @Deprecated
    public ContractionHierarchyPrecomputation(Graph<V, E> graph, int parallelism)
    {
        this(graph, parallelism, Random::new, PairingHeap::new);
    }


    /**
     * Constructs a new instance of the algorithm for a given {@code graph} and
     * {@code randomSupplier}. Provided {@code randomSupplier} should return different random
     * generators instances, because they are used by different threads.
     *
     * @param graph graph
     * @param randomSupplier supplier for preferable instances of {@link Random}
     * @deprecated replaced with {@link #ContractionHierarchyPrecomputation(Graph, Supplier, ThreadPoolExecutor)}
     */
    @Deprecated
    public ContractionHierarchyPrecomputation(Graph<V, E> graph, Supplier<Random> randomSupplier)
    {
        this(graph, Runtime.getRuntime().availableProcessors(), randomSupplier);
    }

    /**
     * Constructs a new instance of the algorithm for a given {@code graph}, {@code randomSupplier}
     * and {@code executor}. Provided {@code randomSupplier} should return different random
     * generators instances, because they are used by different threads. It is up to a user of this
     * algorithm to handle the creation and termination of the provided {@code executor}. Utility
     * methods to manage a {@code ThreadPoolExecutor} see {@link ConcurrencyUtil}.
     *
     * @param graph graph
     * @param randomSupplier supplier for preferable instances of {@link Random}
     * @param executor executor which will be used for parallelization
     */
    public ContractionHierarchyPrecomputation(Graph<V, E> graph, Supplier<Random> randomSupplier, ThreadPoolExecutor executor)
    {
        this(graph, randomSupplier, PairingHeap::new, executor);
    }

    /**
     * Constructs a new instance of the algorithm for a given {@code graph}, {@code parallelism} and
     * {@code randomSupplier}.
     *
     * @param graph graph
     * @param parallelism maximum number of threads used in the computations
     * @param randomSupplier supplier for preferable instances of {@link Random}
     * @deprecated replaced with {@link #ContractionHierarchyPrecomputation(Graph, Supplier, ThreadPoolExecutor)}
     */
    @Deprecated
    public ContractionHierarchyPrecomputation(
        Graph<V, E> graph, int parallelism, Supplier<Random> randomSupplier)
    {
        this(graph, parallelism, randomSupplier, PairingHeap::new);
    }

    /**
     * Constructs a new instance of the algorithm for a given {@code graph}, {@code parallelism},
     * {@code randomSupplier} and {@code shortcutsSearchHeapSupplier}. Provided
     * {@code randomSupplier} should return different random generators instances, because they are
     * used by different threads.
     *
     * @param graph graph
     * @param parallelism maximum number of threads used in the computations
     * @param randomSupplier supplier for preferable instances of {@link Random}
     * @param shortcutsSearchHeapSupplier supplier for the preferable heap implementation.
     * @deprecated replaced with {@link #ContractionHierarchyPrecomputation(Graph, Supplier, Supplier, ThreadPoolExecutor)}
     */
    @Deprecated
    public ContractionHierarchyPrecomputation(
        Graph<V, E> graph, int parallelism, Supplier<Random> randomSupplier,
        Supplier<AddressableHeap<Double, ContractionVertex<V>>> shortcutsSearchHeapSupplier)
    {
        init(graph, randomSupplier, shortcutsSearchHeapSupplier, ConcurrencyUtil.createThreadPoolExecutor(parallelism));
    }

    /**
     * Constructs a new instance of the algorithm for a given {@code graph}, {@code parallelism},
     * {@code randomSupplier}, {@code shortcutsSearchHeapSupplier} and {@code executor}. Provided
     * {@code randomSupplier} should return different random generators instances, because they are
     * used by different threads. It is up to a user of this algorithm to handle the creation and
     * termination of the provided {@code executor}. For utility methods to manage a
     * {@code ThreadPoolExecutor} see {@link ConcurrencyUtil}.
     *
     * @param graph graph
     * @param randomSupplier supplier for preferable instances of {@link Random}
     * @param shortcutsSearchHeapSupplier supplier for the preferable heap implementation.
     * @param executor executor which will be used for parallelization
     */
    public ContractionHierarchyPrecomputation(
            Graph<V, E> graph, Supplier<Random> randomSupplier,
            Supplier<AddressableHeap<Double, ContractionVertex<V>>> shortcutsSearchHeapSupplier,
            ThreadPoolExecutor executor)
    {
        init(graph, randomSupplier, shortcutsSearchHeapSupplier, executor);
    }

    /**
     * Initialized field of this algorithm.
     *
     * @param graph                       a graph
     * @param randomSupplier              supplier for preferable instances of {@link Random}
     * @param shortcutsSearchHeapSupplier supplier for the preferable heap implementation.
     * @param executor                    executor which will be used for parallelization
     */
    private void init(Graph<V, E> graph, Supplier<Random> randomSupplier,
                      Supplier<AddressableHeap<Double, ContractionVertex<V>>> shortcutsSearchHeapSupplier,
                      ThreadPoolExecutor executor){
        this.graph = graph;
        this.contractionGraph = GraphTypeBuilder
                .<ContractionVertex<V>, ContractionEdge<E>> directed().weighted(true)
                .allowingMultipleEdges(false).allowingSelfLoops(false).buildGraph();
        this.parallelism = executor.getMaximumPoolSize();
        this.shortcutsSearchHeapSupplier = shortcutsSearchHeapSupplier;

        vertices = new ArrayList<>(graph.vertexSet().size());
        shortcutEdges = new ArrayList<>(Collections.nCopies(graph.vertexSet().size(), null));
        verticesData = new ArrayList<>(Collections.nCopies(graph.vertexSet().size(), null));

        contractionLevelCounter = new AtomicInteger();

        maskedContractionGraph = new MaskSubgraph<>(
                contractionGraph,
                v -> verticesData.get(v.vertexId) != null && verticesData.get(v.vertexId).isContracted,
                e -> false);
        contractionMapping = new HashMap<>();

        completionService = new ExecutorCompletionService<>(executor);

        tasks = new ArrayList<>(parallelism);
        computeInitialPrioritiesConsumers = new ArrayList<>(parallelism);
        for (int i = 0; i < parallelism; ++i) {
            tasks.add(new ContractionTask(i));
            computeInitialPrioritiesConsumers.add(new Consumer<>() {
                Random random = randomSupplier.get();

                @Override
                public void accept(ContractionVertex<V> vertex) {
                    verticesData.set(vertex.vertexId, getVertexData(vertex, random.nextInt()));
                }
            });
        }

        computeIndependentSetConsumer =
                vertex -> verticesData.get(vertex.vertexId).isIndependent = vertexIsIndependent(vertex);
        computeShortcutsConsumer =
                vertex -> shortcutEdges.set(vertex.vertexId, getShortcuts(vertex));
        updateNeighboursConsumer = vertex -> updateNeighboursData(vertex);
        markUpwardEdgesConsumer = vertex -> contractionGraph
                .outgoingEdgesOf(vertex).forEach(
                        e -> e.isUpward =
                                contractionGraph.getEdgeSource(e).contractionLevel < contractionGraph
                                        .getEdgeTarget(e).contractionLevel);
    }

    /**
     * Computes contraction hierarchy for {@code graph}.
     *
     * @return contraction hierarchy and mapping of original to contracted vertices
     */
    public ContractionHierarchy<V, E> computeContractionHierarchy()
    {
        fillContractionGraphAndVerticesArray();
        // compute initial priorities in parallel
        submitTasks(0, contractionGraph.vertexSet().size(), computeInitialPrioritiesConsumers);

        contractVertices();

        // mark upward edges in parallel
        submitTasks(0, contractionGraph.vertexSet().size(), markUpwardEdgesConsumer);

        return new ContractionHierarchy<>(graph, contractionGraph, contractionMapping);
    }

    /**
     * Fills {@code contractionGraph} and {@code vertices}. If there exist multiple edges between
     * two vertices in the original graph, the shortest is added to the {@code contractionGraph}.
     * Self loops of the original graph are ignored. If original graph is undirected, each edge is
     * transformed into two directed edges in the contraction graph.
     */
    private void fillContractionGraphAndVerticesArray()
    {
        int vertexId = 0;
        for (V vertex : graph.vertexSet()) {
            ContractionVertex<V> contractionVertex = new ContractionVertex<>(vertex, vertexId);
            vertices.add(contractionVertex);
            ++vertexId;

            contractionGraph.addVertex(contractionVertex);
            contractionMapping.put(vertex, contractionVertex);
        }

        for (E e : graph.edgeSet()) {
            V source = graph.getEdgeSource(e);
            V target = graph.getEdgeTarget(e);
            if (!source.equals(target)) {
                ContractionVertex<V> contractionSource = contractionMapping.get(source);
                ContractionVertex<V> contractionTarget = contractionMapping.get(target);

                double eWeight = graph.getEdgeWeight(e);

                ContractionEdge<E> oldEdge =
                    contractionGraph.getEdge(contractionSource, contractionTarget);
                if (oldEdge == null) {
                    ContractionEdge<E> forward = new ContractionEdge<>(e);
                    contractionGraph.addEdge(contractionSource, contractionTarget, forward);
                    contractionGraph.setEdgeWeight(forward, eWeight);

                    if (graph.getType().isUndirected()) {
                        ContractionEdge<E> backward = new ContractionEdge<>(e);
                        contractionGraph.addEdge(contractionTarget, contractionSource, backward);
                        contractionGraph.setEdgeWeight(backward, eWeight);
                    }
                } else {
                    double oldWeight = contractionGraph.getEdgeWeight(oldEdge);
                    if (eWeight < oldWeight) {
                        contractionGraph.setEdgeWeight(oldEdge, eWeight);
                        oldEdge.edge = e;
                        if (graph.getType().isUndirected()) {
                            ContractionEdge<E> oldBackwardEdge =
                                contractionGraph.getEdge(contractionTarget, contractionSource);
                            oldBackwardEdge.edge = e;
                            contractionGraph.setEdgeWeight(oldBackwardEdge, eWeight);
                        }
                    }
                }
            }
        }
    }

    /**
     * Performs contraction of vertices in {@code contractionGraph}.
     */
    private void contractVertices()
    {
        int independentSetStart;
        int independentSetEnd = graph.vertexSet().size();

        while (independentSetEnd != 0) {
            // compute independent set in parallel
            submitTasks(0, independentSetEnd, computeIndependentSetConsumer);

            independentSetStart = partitionIndependentSet(independentSetEnd);

            // compute shortcuts for independent vertices in parallel
            submitTasks(independentSetStart, independentSetEnd, computeShortcutsConsumer);
            contractIndependentSet(independentSetStart, independentSetEnd);
            // update neighbours priorities in parallel
            submitTasks(independentSetStart, independentSetEnd, updateNeighboursConsumer);
            markContracted(independentSetStart, independentSetEnd);

            independentSetEnd = independentSetStart;
        }
    }

    /**
     * Determines if a {@code vertex} is independent wrt the overlay graph.
     *
     * @param vertex vertex
     * @return true iff vertex is independent
     */
    private boolean vertexIsIndependent(ContractionVertex<V> vertex)
    {
        for (ContractionVertex<V> firstLevelNeighbour : Graphs
            .neighborSetOf(maskedContractionGraph, vertex))
        {
            if (isGreater(vertex, firstLevelNeighbour)) {
                return false;
            }

            for (ContractionVertex<V> secondLevelNeighbour : Graphs
                .neighborSetOf(maskedContractionGraph, firstLevelNeighbour))
            {
                if (!secondLevelNeighbour.equals(vertex)) {
                    if (isGreater(vertex, secondLevelNeighbour)) {
                        return false;
                    }
                }
            }
        }

        return true;
    }

    /**
     * Determines if priority of {@code vertex1} is greater than the priority of {@code vertex2}. If
     * priorities stored in {@code verticesData} are equal, the tie breaking rule is used. First
     * random values in {@code verticesData} are checked. If they are also equal, ids of vertices
     * are inspected. Each vertex has a unique id which guaranties that on each iteration there
     * exists at least one independent vertex.
     *
     * @return true iff priority of {@code vertex1} is greater than {@code vertex2}
     */
    private boolean isGreater(ContractionVertex<V> vertex1, ContractionVertex<V> vertex2)
    {
        VertexData data1 = verticesData.get(vertex1.vertexId);
        VertexData data2 = verticesData.get(vertex2.vertexId);

        if (data1.priority != data2.priority) {
            return data1.priority > data2.priority;
        }

        // tie breaking
        if (data1.random != data2.random) {
            return data1.random > data2.random;
        }

        return vertex1.vertexId > vertex2.vertexId;
    }

    /**
     * Partitions vertices in {@code vertices} on the segment $[0,notContractedVerticesEnd)$ into
     * correspondingly not independent and independent.
     *
     * @param notContractedVerticesEnd position after the last not yet contracted vertex in
     *        {@code vertices}
     * @return position of first independent vertex in created partition
     */
    private int partitionIndependentSet(int notContractedVerticesEnd)
    {
        int left = 0;
        int right = notContractedVerticesEnd - 1;
        while (left <= right) {
            while (!verticesData.get(left).isIndependent) {
                ++left;
            }
            while (right >= 0 && verticesData.get(right).isIndependent) {
                --right;
            }
            if (left <= right) {
                ContractionVertex<V> leftVertex = vertices.get(left);
                ContractionVertex<V> rightVertex = vertices.get(right);

                swap(verticesData, left, right);
                swap(vertices, left, right);
                swap(shortcutEdges, left, right);
                int tmpId = leftVertex.vertexId;
                leftVertex.vertexId = rightVertex.vertexId;
                rightVertex.vertexId = tmpId;
            }

        }
        return left;
    }

    /**
     * Swaps elements in {@code list} on the positions {@code i} and {@code j}.
     *
     * @param list list
     * @param i position of first element
     * @param j position of second element
     */
    private <T> void swap(List<T> list, int i, int j)
    {
        T tmp = list.get(i);
        list.set(i, list.get(j));
        list.set(j, tmp);
    }

    /**
     * Contracts vertices in the current independent set. This step should be performed sequentially
     * because the {@code contractionGraph} is not thread-safe.
     *
     * @param independentSetStart first vertex in the independent set
     * @param independentSetEnd position after the last vertex in the independent set
     */
    private void contractIndependentSet(int independentSetStart, int independentSetEnd)
    {
        vertices
            .subList(independentSetStart, independentSetEnd)
            .forEach(v -> contractVertex(v, contractionLevelCounter.getAndIncrement()));
    }

    /**
     * Contracts provided {@code vertex} and assigns the specified {@code contractionLevel} to it.
     *
     * @param vertex vertex to contract
     * @param contractionLevel vertex contraction level
     */
    private void contractVertex(ContractionVertex<V> vertex, int contractionLevel)
    {
        List<Pair<ContractionEdge<E>, ContractionEdge<E>>> shortcuts =
            this.shortcutEdges.get(vertex.vertexId);

        // add shortcuts
        for (Pair<ContractionEdge<E>, ContractionEdge<E>> shortcut : shortcuts) {
            ContractionVertex<V> shortcutSource =
                maskedContractionGraph.getEdgeSource(shortcut.getFirst());
            ContractionVertex<V> shortcutTarget =
                maskedContractionGraph.getEdgeTarget(shortcut.getSecond());
            ContractionEdge<E> shortcutEdge = new ContractionEdge<>(shortcut);

            double shortcutWeight = maskedContractionGraph.getEdgeWeight(shortcut.getFirst())
                + maskedContractionGraph.getEdgeWeight(shortcut.getSecond());

            boolean added = contractionGraph.addEdge(shortcutSource, shortcutTarget, shortcutEdge);

            if (added) {
                contractionGraph.setEdgeWeight(shortcutEdge, shortcutWeight);
            } else { // update weight of already existing edge
                ContractionEdge<E> originalEdge =
                    contractionGraph.getEdge(shortcutSource, shortcutTarget);
                originalEdge.edge = null;
                originalEdge.bypassedEdges = shortcut;
                originalEdge.originalEdges =
                    shortcut.getFirst().originalEdges + shortcut.getSecond().originalEdges;
                contractionGraph.setEdgeWeight(originalEdge, shortcutWeight);
            }
        }

        vertex.contractionLevel = contractionLevel;
    }

    /**
     * Updates neighbours priorities and theirs {@code depth} values for a given {@code vertex}.
     * Method {@link Graphs#neighborSetOf(Graph, Object)} is used to traverse neighbours.
     *
     * @param vertex a vertex in the {@code contractionGraph}
     */
    private void updateNeighboursData(ContractionVertex<V> vertex)
    {
        VertexData vertexData = verticesData.get(vertex.vertexId);
        for (ContractionVertex<V> neighbour : Graphs
            .neighborSetOf(maskedContractionGraph, vertex))
        {
            VertexData neighbourData = verticesData.get(neighbour.vertexId);
            neighbourData.depth = Math.max(neighbourData.depth, vertexData.depth + 1);
            updatePriority(neighbour, neighbourData);
        }
    }

    /**
     * Creates an instance of {@code VertexData} for {@code vertex} using specified random number
     * and sets its {@code priority} value.
     *
     * @param vertex a vertex in {@code contractionGraph}
     * @param random random number
     * @return created {@code VertexData}
     */
    private VertexData getVertexData(ContractionVertex<V> vertex, int random)
    {
        VertexData result = new VertexData(random);
        updatePriority(vertex, result);
        return result;
    }

    /**
     * Updates {@code priority} field value of {@code data}, which corresponds to the
     * {@code vertex}.
     *
     * @param vertex a vertex in the {@code contractionGraph}
     * @param data data of vertex
     */
    private void updatePriority(ContractionVertex<V> vertex, VertexData data)
    {
        VertexStatistics statistics = getStatistics(vertex);
        if (statistics.removedContractionEdges * statistics.removedOriginalEdges == 0) {
            data.priority = data.depth;
        } else {
            data.priority =
                4.0 * statistics.addedContractionEdges / statistics.removedContractionEdges
                    + 2.0 * statistics.addedOriginalEdges / statistics.removedOriginalEdges
                    + 1.0 * data.depth;
        }
    }

    /**
     * Computes statistics for specified {@code vertex}.
     *
     * @param vertex a vertex in the {@code contractionGraph}
     * @return statistics of {@code vertex}
     */
    private VertexStatistics getStatistics(ContractionVertex<V> vertex)
    {
        ToStatisticsConsumer consumer = new ToStatisticsConsumer();
        iterateShortcutEdges(vertex, consumer);
        maskedContractionGraph.edgesOf(vertex).forEach(edge -> {
            ++consumer.statistics.removedContractionEdges;
            consumer.statistics.removedOriginalEdges += edge.originalEdges;
        });
        return consumer.statistics;
    }

    /**
     * Computes shortcuts for vertex {@code vertex} wrt the overlay graph.
     *
     * @param vertex a vertex in {@code contractionGraph}
     * @return list of shortcuts
     */
    private List<Pair<ContractionEdge<E>, ContractionEdge<E>>> getShortcuts(
        ContractionVertex<V> vertex)
    {
        ToListConsumer consumer = new ToListConsumer();
        iterateShortcutEdges(vertex, consumer);
        return consumer.shortcuts;
    }

    /**
     * Runs forward shortest-path searches in current overlay graph to find shortcuts of
     * {@code vertex}. The {@code vertex} itself is ignored. Applies {@code shortcutConsumer}
     * whenever a new shortcut is found. To prune the search, keeps track of the value $d(u, v) +
     * max {c(v, w) : (v, w) \in E^{\prime}}$, where $d(u,v)$ denotes distance between vertex $u$
     * and $v$, $c(v,w)$ is weight of the edge $(v,w)$, $E^{\prime}$ is the set of edges of the
     * overlay graph. If the original graph is undirected each predecessor of {@code vertex} is
     * considered only once and for each found shortcut in the forward direction another one in the
     * backward direction is generated.
     *
     * @param vertex a vertex in {@code contractionGraph}
     * @param shortcutConsumer consumer to supply shortcuts to
     */
    private void iterateShortcutEdges(
        ContractionVertex<V> vertex,
        BiConsumer<ContractionEdge<E>, ContractionEdge<E>> shortcutConsumer)
    {
        Set<ContractionVertex<V>> successors = new HashSet<>();
        double maxOutgoingEdgeWeight = Double.MIN_VALUE;

        for (ContractionEdge<E> outEdge : maskedContractionGraph.outgoingEdgesOf(vertex)) {
            ContractionVertex<V> successor = maskedContractionGraph.getEdgeTarget(outEdge);

            if (verticesData.get(successor.vertexId) != null
                && verticesData.get(successor.vertexId).isIndependent)
            { // does not belong to overlay graph
                continue;
            }

            successors.add(successor);
            maxOutgoingEdgeWeight =
                Math.max(maxOutgoingEdgeWeight, contractionGraph.getEdgeWeight(outEdge));
        }

        for (ContractionEdge<E> inEdge : maskedContractionGraph.incomingEdgesOf(vertex)) {
            ContractionVertex<V> predecessor = contractionGraph.getEdgeSource(inEdge);
            if (verticesData.get(predecessor.vertexId) != null
                && verticesData.get(predecessor.vertexId).isIndependent)
            { // does not belong to overlay graph
                continue;
            }

            boolean containedPredecessor = successors.remove(predecessor); // might contain the
                                                                           // predecessor vertex

            Map<ContractionVertex<V>,
                AddressableHeap.Handle<Double, ContractionVertex<V>>> distances =
                    iterateToSuccessors(
                        maskedContractionGraph, predecessor, successors, vertex,
                        contractionGraph.getEdgeWeight(inEdge) + maxOutgoingEdgeWeight);

            for (ContractionVertex<V> successor : successors) {
                ContractionEdge<E> outEdge = contractionGraph.getEdge(vertex, successor);
                double pathWeight = contractionGraph.getEdgeWeight(inEdge)
                    + contractionGraph.getEdgeWeight(outEdge);

                if (!distances.containsKey(successor)
                    || distances.get(successor).getKey() > pathWeight)
                {
                    shortcutConsumer.accept(inEdge, outEdge);
                    if (graph.getType().isUndirected()) {
                        shortcutConsumer
                            .accept(
                                contractionGraph.getEdge(successor, vertex),
                                contractionGraph.getEdge(vertex, predecessor));
                    }
                }
            }

            if (containedPredecessor && graph.getType().isDirected()) { // restore predecessor if
                                                                        // needed
                successors.add(predecessor);
            }
        }
    }

    /**
     * Performs Dijkstra search in the {@code graph} starting at vertex {@code source} ignoring
     * vertex {@code vertexToIgnore}. The search is limited by {@code radius}. The search is
     * proceeded until all vertices in {@code successors} are reached or there is no vertex left to
     * traverse.
     *
     * @param graph graph to traverse
     * @param source search start vertex
     * @param successors vertices to reach
     * @param vertexToIgnore vertex to ignore
     * @param radius search distance limit
     * @return computed distances for reached vertices
     */
    private Map<ContractionVertex<V>,
        AddressableHeap.Handle<Double, ContractionVertex<V>>> iterateToSuccessors(
            Graph<ContractionVertex<V>, ContractionEdge<E>> graph, ContractionVertex<V> source,
            Set<ContractionVertex<V>> successors, ContractionVertex<V> vertexToIgnore,
            double radius)
    {
        AddressableHeap<Double, ContractionVertex<V>> heap = shortcutsSearchHeapSupplier.get();
        Map<ContractionVertex<V>,
            AddressableHeap.Handle<Double, ContractionVertex<V>>> distanceMap = new HashMap<>();

        updateDistance(source, 0.0, heap, distanceMap);

        int numOfSuccessors = successors.size();
        int passedSuccessors = 0;

        while (!heap.isEmpty()) {
            AddressableHeap.Handle<Double, ContractionVertex<V>> min = heap.deleteMin();
            ContractionVertex<V> vertex = min.getValue();
            double distance = min.getKey();

            if (distance > radius) {
                break;
            }

            if (successors.contains(vertex)) {
                ++passedSuccessors;
                if (passedSuccessors == numOfSuccessors) {
                    break;
                }
            }

            relaxNode(graph, heap, distanceMap, vertex, distance, vertexToIgnore);
        }
        return distanceMap;
    }

    /**
     * Relaxes outgoing edges of {@code vertex} in {@code graph} ignoring successors marked as
     * independent and {@code vertexToIgnore}.
     *
     * @param graph graph
     * @param heap search priority queue
     * @param distanceMap vertex distances
     * @param vertex vertex to relax
     * @param vertexDistance update distance for {@code vertex}
     * @param vertexToIgnore vertex to ignore
     */
    private void relaxNode(
        Graph<ContractionVertex<V>, ContractionEdge<E>> graph,
        AddressableHeap<Double, ContractionVertex<V>> heap,
        Map<ContractionVertex<V>, AddressableHeap.Handle<Double, ContractionVertex<V>>> distanceMap,
        ContractionVertex<V> vertex, double vertexDistance, ContractionVertex<V> vertexToIgnore)
    {

        for (ContractionEdge<E> edge : graph.outgoingEdgesOf(vertex)) {
            ContractionVertex<V> successor = graph.getEdgeTarget(edge);

            double edgeWeight = graph.getEdgeWeight(edge);

            if (edgeWeight < 0) {
                throw new IllegalArgumentException("Negative edge weight not allowed");
            }

            if (successor.equals(vertexToIgnore) || (verticesData.get(successor.vertexId) != null
                && verticesData.get(successor.vertexId).isIndependent))
            {
                // skip independent vertices because they do not belong to overlay graph
                continue;
            }

            double updatedDistance = vertexDistance + edgeWeight;

            updateDistance(successor, updatedDistance, heap, distanceMap);
        }
    }

    /**
     * Updates distance for {@code vertex} in the {@code heap} if needed.
     *
     * @param vertex vertex
     * @param distance updated distance
     * @param heap search priority queue
     * @param distanceMap vertex distances
     */
    private void updateDistance(
        ContractionVertex<V> vertex, double distance,
        AddressableHeap<Double, ContractionVertex<V>> heap,
        Map<ContractionVertex<V>, AddressableHeap.Handle<Double, ContractionVertex<V>>> distanceMap)
    {
        AddressableHeap.Handle<Double, ContractionVertex<V>> node = distanceMap.get(vertex);
        if (node == null) {
            node = heap.insert(distance, vertex);
            distanceMap.put(vertex, node);
        } else if (distance < node.getKey()) {
            node.decreaseKey(distance);
        }
    }

    /**
     * Sets value of {@code isContracted} field of {@code VertexData} for each vertex in the segment
     * $[independentSetStart,independentSetEnd)$ to $true$. This step should not interfere with
     * other steps during the contraction because it alters the {@code maskedContractionGraph}.
     *
     * @param independentSetStart start of independent set
     * @param independentSetEnd end of independent set
     */
    private void markContracted(int independentSetStart, int independentSetEnd)
    {
        for (int i = independentSetStart; i < independentSetEnd; ++i) {
            verticesData.get(vertices.get(i).vertexId).isContracted = true;
        }
    }

    /**
     * Submits {@code tasks} to the {@code completionService} setting start and end of the working
     * segment and consumer for them
     *
     * @param segmentStart start of working segment inclusively
     * @param segmentEnd start of working segment exclusively
     * @param consumer consumer
     */
    private void submitTasks(
        int segmentStart, int segmentEnd, Consumer<ContractionVertex<V>> consumer)
    {
        for (ContractionTask task : tasks) {
            task.consumer = consumer;
            task.segmentStart = segmentStart;
            task.segmentsEnd = segmentEnd;
            completionService.submit(task, null);
        }
        waitForTasksCompletion(tasks.size());
    }

    /**
     * Submits {@code tasks} to the {@code completionService} setting start and end of the working
     * segment and an individual instance of consumer provided in {@code consumers}.
     *
     * @param segmentStart start of working segment inclusively
     * @param segmentEnd start of working segment exclusively
     * @param consumers consumers
     */
    private void submitTasks(
        int segmentStart, int segmentEnd, List<Consumer<ContractionVertex<V>>> consumers)
    {
        for (int i = 0; i < tasks.size(); ++i) {
            ContractionTask task = tasks.get(i);
            task.consumer = consumers.get(i);
            task.segmentStart = segmentStart;
            task.segmentsEnd = segmentEnd;
            completionService.submit(task, null);
        }
        waitForTasksCompletion(tasks.size());
    }

    /**
     * Takes {@code numOfTasks} tasks from the {@link #completionService}.
     *
     * @param numOfTasks number of tasks
     */
    private void waitForTasksCompletion(int numOfTasks)
    {
        for (int i = 0; i < numOfTasks; ++i) {
            try {
                completionService.take().get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * Return type of this algorithm. Contains {@code contractionGraph} and
     * {@code contractionMapping}.
     *
     * @param <V> the graph vertex type
     * @param <E> the graph edge type
     */
    public static class ContractionHierarchy<V, E>
    {
        /**
         * The underlying graph.
         */
        private Graph<V, E> graph;
        /**
         * Graph that stores the computed contraction hierarchy.
         */
        private Graph<ContractionVertex<V>, ContractionEdge<E>> contractionGraph;
        /**
         * Mapping of the vertices in the original graph to the vertices in the contraction
         * hierarchy graph.
         */
        private Map<V, ContractionVertex<V>> contractionMapping;

        /**
         * Returns the underlying graph of this contraction hierarchy.
         *
         * @return underlying graph of this contraction hierarchy
         */
        public Graph<V, E> getGraph()
        {
            return graph;
        }

        /**
         * Returns contracted graph.
         *
         * @return contracted graph
         */
        public Graph<ContractionVertex<V>, ContractionEdge<E>> getContractionGraph()
        {
            return contractionGraph;
        }

        /**
         * Returns mapping of the vertices in the original graph to the vertices in the contracted
         * graph.
         *
         * @return vertices mapping
         */
        public Map<V, ContractionVertex<V>> getContractionMapping()
        {
            return contractionMapping;
        }

        /**
         * Constructs a new instance for the given {@code graph}, {@code contractionGraph} and
         * {@code contractionMapping}.
         *
         * @param graph graph
         * @param contractionGraph contracted graph
         * @param contractionMapping vertices mapping
         */
        ContractionHierarchy(
            Graph<V, E> graph, Graph<ContractionVertex<V>, ContractionEdge<E>> contractionGraph,
            Map<V, ContractionVertex<V>> contractionMapping)
        {
            this.graph = graph;
            this.contractionGraph = contractionGraph;
            this.contractionMapping = contractionMapping;
        }

        /**
         * Unpacks {@code edge} by recursively going from target to source.
         *
         * @param edge edge to unpack
         * @param vertexList vertex list of the path
         * @param edgeList edge list of the path
         */
        public void unpackBackward(
            ContractionEdge<E> edge, LinkedList<V> vertexList, LinkedList<E> edgeList)
        {
            if (edge.bypassedEdges == null) {
                vertexList.addFirst(contractionGraph.getEdgeSource(edge).vertex);
                edgeList.addFirst(edge.edge);
            } else {
                unpackBackward(edge.bypassedEdges.getSecond(), vertexList, edgeList);
                unpackBackward(edge.bypassedEdges.getFirst(), vertexList, edgeList);
            }
        }

        /**
         * Unpacks {@code edge} by recursively going from source to target.
         *
         * @param edge edge to unpack
         * @param vertexList vertex list of the path
         * @param edgeList edge list of the path
         */
        public void unpackForward(
            ContractionEdge<E> edge, LinkedList<V> vertexList, LinkedList<E> edgeList)
        {
            if (edge.bypassedEdges == null) {
                vertexList.addLast(contractionGraph.getEdgeTarget(edge).vertex);
                edgeList.addLast(edge.edge);
            } else {
                unpackForward(edge.bypassedEdges.getFirst(), vertexList, edgeList);
                unpackForward(edge.bypassedEdges.getSecond(), vertexList, edgeList);
            }
        }

    }

    /**
     * Vertex for building the contraction hierarchy, which contains an original vertex from
     * {@code graph}.
     *
     * @param <V1> type of the original vertex.
     */
    public static class ContractionVertex<V1>
    {
        /**
         * Identifies the position in {@code verticesData} and {@code shortcutEdges} lists, that
         * corresponds to this vertex.
         */
        int vertexId;
        /**
         * Original vertex from {@code graph} this instance represents.
         */
        V1 vertex;
        /**
         * Level that is assigned to this vertex during contraction which is used to determine
         * upward edges in the hierarchy.
         */
        int contractionLevel;

        /**
         * Constructs a new vertex for given original vertex {@code vertex} and {@code vertexId}.
         *
         * @param vertex vertex in {@code graph}
         * @param vertexId id
         */
        ContractionVertex(V1 vertex, int vertexId)
        {
            this.vertexId = vertexId;
            this.vertex = vertex;
        }

        @Override
        public boolean equals(Object o)
        {
            if (this == o)
                return true;
            if (o == null || getClass() != o.getClass())
                return false;
            ContractionVertex<?> that = (ContractionVertex<?>) o;
            return Objects.equals(vertex, that.vertex);
        }

        @Override
        public int hashCode()
        {
            return Objects.hash(vertex);
        }
    }

    /**
     * Edge for building the contraction hierarchy. Each instance of this class contains either an
     * original edge from {@code graph} or a pair of bypassed edges in case it represents a
     * shortcut.
     *
     * @param <E1> type of the original vertex.
     */
    public static class ContractionEdge<E1>
    {
        /**
         * Original edge in {@code graph}.
         */
        E1 edge;
        /**
         * Pair of edges this edge bypasses in case it is a shortcut.
         */
        Pair<ContractionEdge<E1>, ContractionEdge<E1>> bypassedEdges;
        /**
         * Determines if this edge source has lower contraction level than its target.
         */
        boolean isUpward;
        /**
         * Number of original edges in {@code graph} this edge represents in the contraction
         * hierarchy.
         */
        int originalEdges;

        /**
         * Constructs a contraction edge for the given original {@code edge}.
         *
         * @param edge an edge in {@code graph}
         */
        ContractionEdge(E1 edge)
        {
            this.edge = edge;
            this.originalEdges = 1;
        }

        /**
         * Constrcuts a contraction edge for the given pair of bypassed edges.
         *
         * @param bypassedEdges skipped edge
         */
        ContractionEdge(Pair<ContractionEdge<E1>, ContractionEdge<E1>> bypassedEdges)
        {
            this.bypassedEdges = bypassedEdges;
            this.originalEdges =
                bypassedEdges.getFirst().originalEdges + bypassedEdges.getSecond().originalEdges;
        }
    }

    /**
     * Caches passed shortcuts into a list.
     */
    private class ToListConsumer
        implements
        BiConsumer<ContractionEdge<E>, ContractionEdge<E>>
    {
        /**
         * Resulting list of shortcuts.
         */
        List<Pair<ContractionEdge<E>, ContractionEdge<E>>> shortcuts;

        /**
         * Constructs an instance of the consumer.
         */
        ToListConsumer()
        {
            shortcuts = new ArrayList<>();
        }

        @Override
        public void accept(ContractionEdge<E> e1, ContractionEdge<E> e2)
        {
            shortcuts.add(Pair.of(e1, e2));
        }
    }

    /**
     * Uses passed shortcuts to compute {@code addedContractionEdges} and {@code addedOriginalEdges}
     * statistics. This consumer is used to run $\textit{simulative}$ contraction - a type of
     * contraction used to compute only the vertex priority.
     */
    private class ToStatisticsConsumer
        implements
        BiConsumer<ContractionEdge<E>, ContractionEdge<E>>
    {
        /**
         * Resulting statistics instance.
         */
        VertexStatistics statistics;

        /**
         * Constructs an instance of the consumer.
         */
        ToStatisticsConsumer()
        {
            this.statistics = new VertexStatistics();
        }

        @Override
        public void accept(ContractionEdge<E> e1, ContractionEdge<E> e2)
        {
            ++statistics.addedContractionEdges;
            statistics.addedOriginalEdges += e1.originalEdges + e2.originalEdges;
        }
    }

    /**
     * Task that is used to perform computing of initial priorities, independent set and shortcuts,
     * updating neighbours priorities and marking upward edges. To achieve good load balancing
     * segment of vertices in {@code vertices} is divided into chunks using {@code taskId}.
     */
    private class ContractionTask
        implements
        Runnable
    {
        /**
         * Id of this task.
         */
        int taskId;
        /**
         * Start if the working segment in {@code vertices} inclusively.
         */
        int segmentStart;
        /**
         * End if the working segment in {@code vertices} exclusively.
         */
        int segmentsEnd;
        /**
         * Performs needed action with vertices.
         */
        Consumer<ContractionVertex<V>> consumer;

        /**
         * Constructs an instance of the task for the given {@code taskId}.
         *
         * @param taskId id of this task
         */
        public ContractionTask(int taskId)
        {
            this.taskId = taskId;
        }

        @Override
        public void run()
        {
            int start = workerSegmentStart(segmentStart, segmentsEnd);
            int end = workerSegmentEnd(segmentStart, segmentsEnd);
            for (int i = start; i < end; ++i) {
                consumer.accept(vertices.get(i));
            }
        }

        /**
         * Computes start of the working chunk for this task.
         *
         * @param segmentStart working segment start
         * @param segmentEnd working segment end
         * @return working chunk start
         */
        private int workerSegmentStart(int segmentStart, int segmentEnd)
        {
            return segmentStart + ((segmentEnd - segmentStart) * taskId) / parallelism;
        }

        /**
         * Computes end of the working chunk for this task.
         *
         * @param segmentStart working segment start
         * @param segmentEnd working segment end
         * @return working chunk end
         */
        private int workerSegmentEnd(int segmentStart, int segmentEnd)
        {
            return segmentStart + ((segmentEnd - segmentStart) * (taskId + 1)) / parallelism;
        }
    }

    /**
     * Contains information of a vertex needed during the contraction.
     */
    private static class VertexData
    {
        /**
         * Hierarchical depth of a vertex measured in the number of hops that can be performed while
         * descending into the lower levels of the hierarchy.
         */
        int depth;
        /**
         * Random number used for tie breaking during computing independent set.
         */
        int random;
        /**
         * Priority of a vertex.
         */
        double priority;
        /**
         * Determines if a vertex is already contracted or not.
         */
        boolean isContracted;
        /**
         * Determines if a vertex is independent or not.
         */
        boolean isIndependent;

        /**
         * Constructs an instance of data for given random value.
         *
         * @param random random number
         */
        VertexData(int random)
        {
            this.random = random;
        }
    }

    /**
     * Contains statistics corresponding to a vertex in {@code contractionGraph} needed to compute
     * its priority.
     */
    private static class VertexStatistics
    {
        /**
         * Number of edges added to the {@code contractionGraph} in case this vertex is contracted.
         */
        int addedContractionEdges;
        /**
         * Number of edges removed to the {@code contractionGraph} in case this vertex is
         * contracted.
         */
        int removedContractionEdges;
        /**
         * Sum of the complexities of edges added to the {@code contractionGraph} in case this
         * vertex is contracted. The complexity of an edge as the number of edges it represents in
         * the original {@code graph}.
         */
        int addedOriginalEdges;
        /**
         * Sum of the complexities of edges removed from the {@code contractionGraph} in case this
         * vertex is contracted. The complexity of an edge as the number of edges it represents in
         * the original {@code graph}.
         */
        int removedOriginalEdges;
    }
}
